package com.kool.kmqtt.server.task;

import com.alibaba.fastjson.JSON;
import com.kool.kmqtt.server.log.ClientNoAckSendPacketCnt;
import com.kool.kmqtt.server.log.NoAckSendPacketLog;
import com.kool.kmqtt.server.log.SessionCntLog;
import com.kool.kmqtt.server.log.SubscriptionsLog;
import com.kool.kmqtt.service.TopicConstant;
import com.kool.kmqtt.server.repository.Repository;
import com.kool.kmqtt.server.repository.RepositoryFactory;
import com.kool.kmqtt.server.repository.subscription.Subscription;
import com.kool.kmqtt.server.session.SessionHolder;
import com.kool.kmqtt.service.KafkaProvider;
import com.kool.kmqtt.util.DateUtil;
import com.kool.kmqtt.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author : luyu
 * @date :2021/3/31 20:33
 */
@Slf4j
public class LogScheduler {
    /**
     * 调度线程池
     */
    private ScheduledExecutorService scheduledExecutorService = null;
    private KafkaProvider kafkaProvider = null;

    /**
     * 日志推送定时任务调度
     */
    public void schedule() {
        scheduledExecutorService = Executors.newScheduledThreadPool(3);
        kafkaProvider = SpringUtil.getBean(KafkaProvider.class);

        //每分钟发会话上下文日志
        scheduledExecutorService.scheduleAtFixedRate(this::sessionLog, 0, 1, TimeUnit.MINUTES);

        //计算下一个00:00到现在的时差
        long delaySubscriptionLog = DateUtil.getDateStart(DateUtil.getDateAfter(new Date(), 1)).getTime() - System.currentTimeMillis();
        //每日00:00发订阅日志
        scheduledExecutorService.scheduleAtFixedRate(this::subscriptionLog, delaySubscriptionLog, 86400000L, TimeUnit.MILLISECONDS);

        //计算下一个00:30到现在的时差
        long delayNoAckSendPacketLog = delaySubscriptionLog + 1800000L;
        //每日00:30发出站未确认消息数
        scheduledExecutorService.scheduleAtFixedRate(this::noAckSendPacketLog, delayNoAckSendPacketLog, 86400000L, TimeUnit.MILLISECONDS);
    }

    /**
     * 每分钟发会话上下文日志
     * 当前多少个会话
     */
    public void sessionLog() {
        try {
            log.info("每分钟发会话上下文日志");
            int cnt = SessionHolder.getInstance().countSession();
            SessionCntLog sessionCntLog = new SessionCntLog();
            sessionCntLog.setTimestamp(DateUtil.dateString(new Date()));
            sessionCntLog.setCnt(cnt);
            kafkaProvider.sendToKafka(TopicConstant.TOPIC_SUFFIX_SESSION_LOG, JSON.toJSONString(sessionCntLog));
        } catch (Exception e) {
            log.error("每分钟发会话上下文日志异常！");
            log.error(e.getMessage(), e);
        }
    }

    /**
     * 每日发订阅日志
     */
    public void subscriptionLog() {
        try {
            log.info("每日发订阅日志");
            Repository repository = RepositoryFactory.getRepository();
            List<Subscription> subscriptions = repository.getSubscriptions();
            SubscriptionsLog subscriptionsLog = new SubscriptionsLog();
            subscriptionsLog.setTimestamp(DateUtil.dateString(new Date()));
            subscriptionsLog.setSubscriptions(subscriptions);
            kafkaProvider.sendToKafka(TopicConstant.TOPIC_SUFFIX_SUBSCRIPTION_LOG, JSON.toJSONString(subscriptionsLog));
        } catch (Exception e) {
            log.error("每日发订阅日志异常！");
            log.error(e.getMessage(), e);
        }
    }

    /**
     * 每日发出站未确认消息数
     */
    public void noAckSendPacketLog() {
        try {
            log.info("每日发出站未确认消息数");
            Repository repository = RepositoryFactory.getRepository();
            List<ClientNoAckSendPacketCnt> clientNoAckSendPacketCnts = repository.countSendPacket();
            NoAckSendPacketLog noAckSendPacketLog = new NoAckSendPacketLog();
            noAckSendPacketLog.setTimestamp(DateUtil.dateString(new Date()));
            noAckSendPacketLog.setClientNoAckSendPacketCnts(clientNoAckSendPacketCnts);
            kafkaProvider.sendToKafka(TopicConstant.TOPIC_SUFFIX_NO_ACK_SEND_PACKET_LOG, JSON.toJSONString(noAckSendPacketLog));
        } catch (Exception e) {
            log.error("每日发出站未确认消息数异常！");
            log.error(e.getMessage(), e);
        }
    }
}
